Course: MLOps engineering
Author: Firas Jolha
The modeling phase is the ML-specific part of the process. This phase aims to specify one or several machine learning models to be deployed in the production. The translation to the ML task depends on the business problem that we are trying to solve. Constraints and requirements from the Business and Data Understanding phase will shape this phase. For example, the application domain’s model assessment metrics might include performance metrics, robustness, fairness, scalability, interpretability, model complexity degree, and model resource demand. We should adjust the importance of each of these metrics according to the use case.
Generally, the modeling phase includes model selection, model specialization, and model training tasks. Additionally, depending on the application, we might use a pre-trained model, compress the model, or apply ensemble learning methods to get the final ML model.
One main complaint about machine learning projects is the lack of reproducibility. Therefore we should ensure that the method and the results of the modeling phase are reproducible by collecting the model training method’s metadata. Typically we collect the following metadata: algorithm, training, validation and testing data set, hyper-parameters, and runtime environment description. The result reproducibility assumes the validation of the model’s mean performance on different random seeds. Following best practices, documenting trained models increases the transparency and explainability in ML projects. A helpful framework here is the “Model Cards Toolkit”.
Many phases in ML development are iterative. Sometimes, we might need to review the business goals, KPIs, and available data from the previous steps to adjust the outcomes of the ML model results. Finally, we package the ML workflow in a pipeline to create repeatable model training during the modeling phase.
MLflow, at its core, provides a suite of tools aimed at simplifying the ML workflow. It is tailored to assist ML practitioners throughout the various stages of ML development and deployment.
MLflow’s functionalities are rooted in several foundational components:
Here are some typical use cases:
Runs
are executions of some piece of data science code, for example, a single pythontrain.py
execution. Each run records metadata (various information about your run such as metrics, parameters, start and end times) and artifacts (output files from the run such as model weights, images, etc).
Experiments
An experiment groups together runs for a specific task. They are logical containers for your runs. You can create an experiment using the CLI, API, or UI.
Model
An MLflow Model is created from an experiment or run that is logged with one of the model flavor’smlflow.<model_flavor>.log_model()
methods. Once logged, this model can then be registered with the Model Registry.
Registered Model
An MLflow Model can be registered with the Model Registry. A registered model has a unique name, contains versions, aliases, tags, and other metadata.
Model Version
Each registered model can have one or many versions. When a new model is added to the Model Registry, it is added as version 1. Each new model registered to the same model name increments the version number.
Model Alias
Model aliases allow you to assign a mutable, named reference to a particular version of a registered model. By assigning an alias to a specific model version, you can use the alias to refer that model version via a model URI or the model registry API.
You can easily install MLflow as a Python package:
pip install mlflow
This will install the mlflow
package and mlflow
command.
The MLflow command-line interface (CLI) provides a simple interface to various functionality in MLflow. Here I show some of them:
mlflow server -h localhost -p 5000
mlflow experiments
mlflow experiments create --experiment-name "cli-exp"
mlflow experiments delete --experiment-id 868669161979377932
mlflow experiments restore --experiment-id 868669161979377932
# Only active experiments (default)
mlflow experiments search -v active_only
# Only deleted experiments
mlflow experiments search -v deleted_only
# All experiments
mlflow experiments search -v all
mlflow runs
mlflow runs list --experiment-id "868669161979377932"
mlflow runs describe --run-id "583005f6b9a847c4ac7448990d54df09"
mlflow runs delete --run-id "583005f6b9a847c4ac7448990d54df09"
mlflow runs restore --run-id "583005f6b9a847c4ac7448990d54df09"
mlflow artifacts list --run-id "583005f6b9a847c4ac7448990d54df09"
/tmp/my-artifact
# using run-id
mlflow artifacts download --run-id '583005f6b9a847c4ac7448990d54df09' -d '/tmp/my-artifact'
# using artifact-uri
mlflow artifacts download --artifact-uri 'runs:/583005f6b9a847c4ac7448990d54df09/iris_model' -d '/tmp/my-artifact'
mlflow artifacts log-artifact -l requirements.txt --run-id '583005f6b9a847c4ac7448990d54df09'
mlflow doctor
mlflow gc
When you delete an experiment/run, its models are not deleted. If you want to delete its models too, you need to delete them manually.
ML experimentation is one of the core practices in Model engineering phase. ML experiments are usually done to optimize ML models, try new methods, test hypothesis…etc. We need a system which can help us to track these experiments and manage our ML workflows such that we can return back and check our progress. MLflow is one of the common tools used to track and log ML models and experiments.
Assume we have the following code snippet for our ML modeling short_ml.py
:
# Iris data sets consists of 3 different types of irises’
# (Setosa, Versicolour, and Virginica) petal and sepal
# length, stored in a 150x4 numpy.ndarray
# The rows being the samples and the columns being:
# Sepal Length, Sepal Width, Petal Length and Petal Width.
from sklearn import datasets
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
# Load the Iris dataset
X, y = datasets.load_iris(return_X_y=True)
# Split the data into training and test sets
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# Define the model hyperparameters
params = {
"solver": "lbfgs",
"penalty": "l2",
"random_state": 8888,
}
# Train the model
lr = LogisticRegression(**params)
lr.fit(X_train, y_train)
# Predict on the test set
y_pred = lr.predict(X_test)
# Calculate metrics
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred, average="macro")
recall = recall_score(y_test, y_pred, average="macro")
f1 = f1_score(y_test, y_pred, average="macro")
print(accuracy, precision, recall, f1)
We can track experiments in MLflow for the previous code snippet as follows:
# mlflow server --host localhost --port 5000
mlflow server
# mlflow ui
This will open the MLflow tracking web server on the port 5000 of the local host by default.
We do not need it unless you are using different values than default port and hostname.
import mlflow
# mlflow.set_tracking_uri(uri="http://localhost:5000")
# MLFLOW_TRACKING_URI environment variable
The following is the code that you want to use it for training and evaluating your model. In this code, we are not tracking ML experiments.
from sklearn import datasets
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
# Load the Iris dataset
X, y = datasets.load_iris(return_X_y=True)
# Split the data into training and test sets
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# Define the model hyperparameters
params = {
"solver": "lbfgs",
"penalty": "l2",
"random_state": 8888,
}
# Train the model
lr = LogisticRegression(**params)
lr.fit(X_train, y_train)
# Predict on the test set
y_pred = lr.predict(X_test)
# Calculate metrics
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred, average="macro")
recall = recall_score(y_test, y_pred, average="macro")
f1 = f1_score(y_test, y_pred, average="macro")
print(accuracy, precision, recall, f1)
The steps are as follows:
We log the model and metadata after we finish training the model. So, do not include logging code inside training code.
import mlflow
from mlflow.models import infer_signature
import mlflow.sklearn
import mlflow.exceptions
# Set our tracking server uri for logging
# mlflow.set_tracking_uri(uri = "http://localhost:5000")
experiment_name = "MLflow-experiment-01"
try:
# Create a new MLflow Experiment
experiment_id = mlflow.create_experiment(name=experiment_name)
except mlflow.exceptions.MlflowException as e:
experiment_id = mlflow.get_experiment_by_name(experiment_name).experiment_id
print(experiment_id)
# Start an MLflow run
with mlflow.start_run(run_name="run-01", experiment_id=experiment_id) as run:
# Log the hyperparameters
mlflow.log_params(params=params)
# Log the performance metrics
mlflow.log_metric("accuracy", accuracy) # type: ignore
mlflow.log_metric("f1", f1) # type: ignore
mlflow.log_metrics({
"accuracy": accuracy,
"f1": f1
})
# Set a tag that we can use to remind ourselves what this run was for
mlflow.set_tag("Training Info", "Basic LR model for my data")
# Infer the model signature
signature = infer_signature(X_test, y_test)
# Log the model
model_info = mlflow.sklearn.log_model(
sk_model=lr,
artifact_path="LR_model",
signature=signature,
input_example=X_test,
registered_model_name="first_model"
)
In MLflow, a model signature precisely defines the schema for model inputs, outputs, and any additional parameters required for effective model operation.
Notes:
mlflow.log_artifact(local_path, artifact_path, run_id)
function. Generally, you can use this function to log any standalone files you prefer like images, text files…etc.main
branch whenever you are ready to deploy your model, since pushing a commit to main
branch will trigger the CI/CD workflows which will be built in Phase 6 of this project.pyfunc
) and use it for inferenceThe python_function (pyfunc) model flavor serves as a default model interface for MLflow Python models. Any MLflow Python model is expected to be loadable as a python_function
model. This enables other MLflow tools to work with any python model regardless of which persistence module or framework was used to produce the model.
import pandas as pd
# Load the model back for predictions as a generic Python Function model flavor
loaded_model = mlflow.pyfunc.load_model(model_info.model_uri)
# Run predictions
predictions = loaded_model.predict(X_test)
iris_feature_names = datasets.load_iris().feature_names
# Compare some prediction results
result = pd.DataFrame(X_test, columns=iris_feature_names)
result["actual_class"] = y_test
result["predicted_class"] = predictions
result[:4]
You can run the file as a normal python file using python
command.
After you run the file, you will notice two experiments, one is the one we created in the code and another one is Default
experiment.
MLflow locally uses mlruns
folder as a backend store and model registry. It will store all artifacts there.
If you are getting an error like this.
RESOURCE_DOES_NOT_EXIST: Could not find experiment with ID 0
That means you deleted the Default
experiment whose id is 0
and used to hold any runs executed without a specific experiment. In order to fix this issue, you have to create a new experiment with Default
and set its id as 0
manually.
(.venv) firasj@Lenovo:~/project$ mlflow experiments create -n Default
Created experiment 'Default' with id 529278589677091129
mlruns/529278589677091129
to mlruns/0
and change its metadata as follos:# The path is different in your machine but it should end with `mlruns/0`
artifact_location: file:///home/firasj/project/mlruns/0
creation_time: 1720372282876 # Do not change this
experiment_id: '0' # Change this
last_update_time: 1720372282876 # Do not change this
lifecycle_stage: active # Do not change this
# Lifecycle stage of the experiment. Can either be ‘active’ or ‘deleted’.
name: Default # Do not change this
In order to see the results of our run, we can navigate to the MLflow UI. Since we have already started the Tracking Server at http://localhost:5000, we can simply navigate to that URL in our browser. When opening the site, you will see a screen similar to the following:
Clicking on the name of the Experiment that we created (“MLflow experiment 01”) will give us a list of runs associated with the Experiment. You should see a random name that has been generated for the run and nothing else show up in the Table list view to the right.
Clicking on the name of the run will take you to the Run page, where the details of what we’ve logged will be shown. The elements have been highlighted below to show how and where this data is recorded within the UI.
You can also access all of the functions in the Tracking UI programmatically with MlflowClient
. For example, the following code snippet search for runs that has the best validation loss among all runs in the experiment.
client = mlflow.tracking.MlflowClient()
experiment_id = "0"
best_run = client.search_runs(
experiment_id, order_by=["metrics.val_loss ASC"], max_results=1
)[0]
print(best_run.info)
# {'run_id': '...', 'metrics': {'val_loss': 0.123}, ...}
mlflow
APIThe mlflow
module provides a high-level “fluent” API for starting and managing MLflow runs. For example:
import mlflow
# Start the run
mlflow.start_run()
mlflow.log_param("my", "param")
mlflow.log_metric("score", 100)
mlflow.end_run()
You can also use the context manager syntax like this:
with mlflow.start_run() as run:
mlflow.log_param("my", "param")
mlflow.log_metric("score", 100)
which automatically terminates the run at the end of the with
block.
The fluent tracking API is not currently threadsafe
. Any concurrent callers to the tracking API must implement mutual exclusion manually.
mlflow.client
APIThe mlflow.client
module provides a Python CRUD interface to MLflow Experiments, Runs, Model Versions, and Registered Models. This is a lower level API that directly translates to MLflow REST API calls. Some of the key functions of this API are demonstrated as follows:
from mlflow import MlflowClient
import mlflow
# Client of an MLflow Tracking Server that creates and manages experiments and runs, and of an MLflow Registry Server that creates and manages registered models and model versions.
client = MlflowClient()
experiment_name = "my experiment"
# Create an experiment
try:
# Create a new MLflow Experiment
experiment_id = mlflow.create_experiment(name=experiment_name)
except mlflow.exceptions.MlflowException as e:
experiment_id = mlflow.get_experiment_by_name(experiment_name).experiment_id
# Create a mlflow.entities.Run object that can be associated with metrics, parameters, artifacts, etc. Unlike mlflow.projects.run(), creates objects but does not run code. Unlike mlflow.start_run(), does not change the “active run” used by mlflow.log_param().
run = client.create_run(experiment_id =experiment_id, run_name = "basic run")
run_id = run.info.run_id
model_name = "LR model"
model_path = model_name
# Create a new registered model in backend store.
rm = client.create_registered_model(name = model_name, description = "First LR model")
print(f"name: {rm.name}")
print(f"tags: {rm.tags}")
print(f"description: {rm.description}")
model_uri = f"runs:/{run_id}/{model_path}"
mv = client.create_model_version(name = model_name, source=model_uri, run_id = run_id)
print(f"Name: {mv.name}")
print(f"Version: {mv.version}")
print(f"Description: {mv.description}")
print(f"Status: {mv.status}")
print(f"Stage: {mv.current_stage}")
# Delete model version in backend.
client.delete_model_version(name = mv.name, version = mv.version)
# Delete registered model. Backend raises exception if a registered model with given name does not exist.
client.delete_registered_model(name = rm.name)
# Deletes a run with the given ID.
client.delete_run(run_id = run_id)
# Delete an experiment from the backend store. This deletion is a soft-delete, not a permanent deletion.
client.delete_experiment(experiment_id = experiment_id)
Sometimes you want to launch multiple MLflow runs in the same program: for example, maybe you are performing a hyperparameter search locally or your experiments are just very fast to run. The way to do this depends on whether you want to run them sequentially or in parallel.
# First run
with mlflow.start_run():
mlflow.log_param("x", 1)
mlflow.log_metric("y", 2)
...
# Another run
with mlflow.start_run():
...
MLflow also supports running multiple runs in parallel using multiprocessing
or multi threading
.
import mlflow
import multiprocessing as mp
def train_model(params):
with mlflow.start_run():
mlflow.log_param("p", params)
...
if __name__ == "__main__":
params = [0.01, 0.02, ...]
pool = mp.Pool(processes=4)
pool.map(train_model, params)
import mlflow
import threading
def train_model(params):
# Create a child run by passing nested=True
with mlflow.start_run(nested=True):
mlflow.log_param("p", params)
...
if __name__ == "__main__":
params = [0.01, 0.02, ...]
threads = []
for p in params:
t = threading.Thread(target=train_model, args=(p,))
threads.append(t)
t.start()
for t in threads:
t.join()
You can also create multiple runs inside a single run. This is useful for scenario like hyperparameter tuning, cross-validation folds, where you need another level of organization within an experiment. You can create child runs by passing parent_run_id
to mlflow.start_run()
function.
# Start parent run
with mlflow.start_run() as parent_run:
param = [0.01, 0.02, 0.03]
# Create a child run for each parameter setting
for p in param:
with mlflow.start_run(nested=True) as child_run:
mlflow.log_param("p", p)
...
mlflow.log_metric("val_loss", val_loss)
You can fetch all child runs under a parent run using tags. The MlflowClient.set_tag()
function lets you add custom tags to runs. A tag can only have a single unique value mapped to it at a time. For example:
client.set_tag(run.info.run_id, "tag_key", "tag_value")
Autologging automatically logs your model, metrics, examples, signature, and parameters with only a single line of code for many of the most popular ML libraries in the Python ecosystem.
import mlflow
# Enable autlogging
mlflow.autolog()
# Your ML modeling code is here.
You should call mlflow.autolog()
before your training code.
In some cases, you may want to access the MLflow Run instance associated with the autologged results. You can access the most recent autolog run through the mlflow.last_active_run()
function.
import mlflow
from sklearn.model_selection import train_test_split
from sklearn.datasets import load_diabetes
from sklearn.ensemble import RandomForestRegressor
mlflow.autolog()
db = load_diabetes()
X_train, X_test, y_train, y_test = train_test_split(db.data, db.target)
# Create and train models.
rf = RandomForestRegressor(n_estimators=100, max_depth=6, max_features=3)
rf.fit(X_train, y_train)
# Use the model to make predictions on the test dataset.
predictions = rf.predict(X_test)
autolog_run = mlflow.last_active_run()
print(autolog_run)
There are two main abstract components associated with the mlflow.data
module, Dataset
and DatasetSource
. The Dataset
abstraction is a metadata tracking object that holds the information about a given logged dataset. It can be mlflow.data.pandas_dataset.PandasDataset
, mlflow.data.tensorflow_dataset.TensorFlowDataset
. The DatasetSource
component of a Dataset
represents the source of a dataset, such as a directory in S3, a Delta Table, or a URL.
The following example demonstrates how to construct a mlflow.data.pandas_dataset.PandasDataset object from a Pandas DataFrame:
import mlflow.data
import pandas as pd
from mlflow.data.pandas_dataset import PandasDataset
dataset_source_url = "https://raw.githubusercontent.com/mlflow/mlflow/master/tests/datasets/winequality-white.csv"
raw_data = pd.read_csv(dataset_source_url, delimiter=";")
# Create an instance of a PandasDataset
dataset = mlflow.data.from_pandas(
raw_data, source=dataset_source_url, name="wine quality - white", targets="quality"
)
For this example, you need to install xgboost
package as follows:
pip install xgboost
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
import xgboost
import mlflow
from mlflow.data.pandas_dataset import PandasDataset
dataset_source_url = "https://raw.githubusercontent.com/mlflow/mlflow/master/tests/datasets/winequality-white.csv"
raw_data = pd.read_csv(dataset_source_url, delimiter=";")
# Extract the features and target data separately
y = raw_data["quality"]
X = raw_data.drop("quality", axis=1)
# Split the data into training and test sets
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.33, random_state=17
)
# Create a label encoder object
le = LabelEncoder()
# Fit and transform the target variable
y_train_encoded = le.fit_transform(y_train)
y_test_encoded = le.transform(y_test)
# Fit an XGBoost binary classifier on the training data split
model = xgboost.XGBClassifier().fit(X_train, y_train_encoded)
# Build the Evaluation Dataset from the test set
y_test_pred = model.predict(X=X_test)
eval_data = X_test
eval_data["label"] = y_test
# Assign the decoded predictions to the Evaluation Dataset
eval_data["predictions"] = le.inverse_transform(y_test_pred)
# Create the PandasDataset for use in mlflow evaluate
pd_dataset = mlflow.data.from_pandas(
eval_data, predictions="predictions", targets="label"
)
mlflow.set_experiment("White Wine Quality")
# Log the Dataset, model, and execute an evaluation run using the configured Dataset
with mlflow.start_run() as run:
mlflow.log_input(pd_dataset, context="training")
mlflow.xgboost.log_model(
artifact_path="white-wine-xgb", xgb_model=model, input_example=X_test
)
result = mlflow.evaluate(data=pd_dataset, predictions=None, model_type="classifier")
# Retrieve the run information
logged_run = mlflow.get_run(run.info.run_id)
# Retrieve the Dataset object
logged_dataset = logged_run.inputs.dataset_inputs[0].dataset
# View some of the recorded Dataset information
print(f"Dataset name: {logged_dataset.name}")
print(f"Dataset digest: {logged_dataset.digest}")
print(f"Dataset profile: {logged_dataset.profile}")
print(f"Dataset schema: {logged_dataset.schema}")
The Tracking UI lets you visually explore your experiments and runs. It allows:
Artifact store persists (typicaly large) artifacts for each run, such as model weights (e.g. a pickled scikit-learn model), images (e.g. PNGs), model and data files (e.g. Parquet file). MLflow stores artifacts in a local directory (mlruns
) by default, but also supports different storage options such as Amazon S3 and Azure Blob Storage.
An MLflow Model is a standard format for packaging machine learning models that can be used in a variety of downstream tools—for example, real-time serving through a REST API. The format defines a convention that lets you save a model in different “flavors” that can be understood by different downstream tools.
Each MLflow Model is a directory containing arbitrary files, together with an MLmodel
file in the root of the directory that can define multiple flavors that the model can be viewed in.
import mlflow
import pandas as pd
from sklearn import datasets
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from mlflow.models import infer_signature
import mlflow.sklearn
import mlflow.exceptions
# Load the Iris dataset
X, y = datasets.load_iris(return_X_y=True)
# Split the data into training and test sets
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# Define the model hyperparameters
params = {
"solver": "lbfgs",
"max_iter": 1000, # Use hydra for configuration management
"random_state": 8888,
}
# Train the model
lr = LogisticRegression(**params)
lr.fit(X_train, y_train)
# Predict on the test set
y_pred = lr.predict(X_test)
# Calculate metrics
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred, average="macro")
recall = recall_score(y_test, y_pred, average="macro")
f1 = f1_score(y_test, y_pred, average="macro")
print(accuracy, precision, recall, f1)
experiment_name = "MLflow experiment 01"
run_name = "run 01"
try:
# Create a new MLflow Experiment
experiment_id = mlflow.create_experiment(name=experiment_name)
except mlflow.exceptions.MlflowException as e:
experiment_id = mlflow.get_experiment_by_name(experiment_name).experiment_id
print(experiment_id)
with mlflow.start_run(run_name=run_name, experiment_id=experiment_id) as run:
# Log the hyperparameters
mlflow.log_params(params=params)
# Log the performance metrics
mlflow.log_metric("accuracy", accuracy)
mlflow.log_metric("f1", f1)
mlflow.log_metrics({
"accuracy": accuracy,
"f1": f1
})
# Set a tag that we can use to remind ourselves what this run was for
mlflow.set_tag("Training Info", "Basic LR model for iris data")
# Infer the model signature
signature = infer_signature(X_test, y_test)
# Log the model
model_info = mlflow.sklearn.log_model(
sk_model=lr,
artifact_path="iris_model",
signature=signature,
input_example=X_test,
registered_model_name="LR_model_01",
pyfunc_predict_fn = "predict_proba"
)
sk_pyfunc = mlflow.sklearn.load_model(model_uri=model_info.model_uri)
predictions = sk_pyfunc.predict(X_test)
print(predictions)
eval_data = pd.DataFrame(y_test)
eval_data.columns = ["label"]
eval_data["predictions"] = predictions
results = mlflow.evaluate(
data=eval_data,
model_type="classifier",
targets= "label",
predictions="predictions",
evaluators = ["default"]
)
print(f"metrics:\n{results.metrics}")
print(f"artifacts:\n{results.artifacts}")
import numpy as np
import mlflow
from mlflow.models import infer_signature
import torch
from torch import nn
import pandas as pd
net = nn.Linear(10, 1)
loss_function = nn.L1Loss()
optimizer = torch.optim.Adam(net.parameters(), lr=1e-4)
X = torch.randn(100, 10)
y = torch.randn(100, 1)
print(X.shape, y.shape)
epochs = 5
for epoch in range(epochs):
optimizer.zero_grad()
outputs = net(X)
loss = loss_function(outputs, y)
loss.backward()
optimizer.step()
with mlflow.start_run() as run:
signature = infer_signature(X.numpy(), net(X).detach().numpy())
model_info = mlflow.pytorch.log_model(
pytorch_model = net,
artifact_path = "pytorch model",
signature=signature,
input_example=X.numpy(),
registered_model_name="pytorch_model"
)
pytorch_pyfunc = mlflow.pyfunc.load_model(model_uri=model_info.model_uri)
X_test = torch.randn(20, 10).numpy()
predictions = pytorch_pyfunc.predict(X_test)
print(predictions)
eval_data = pd.DataFrame(X.numpy())
eval_data = pd.DataFrame(y.numpy())
print(eval_data)
eval_data.columns = ["label"]
eval_data["predictions"] = net(X).detach().numpy()
print(eval_data.shape)
results = mlflow.evaluate(
data=eval_data,
model_type="regressor",
targets= "label",
predictions="predictions",
evaluators = ["default"]
)
print(f"metrics:\n{results.metrics}")
print(f"artifacts:\n{results.artifacts}")
We have two schemas of model uris to retrieve models as follows:
runs
schemeruns:/<run_id>/<model_artifact_path>
models
scheme# Fetch a specific model version
models:/<model_name>/<version>
# Fetch a model version by alias
models:/<model_name>@alias
We can use mlflow.pyfunc.load_model
to load any model who has pyfunc
flavor from the model registry as follows:
import mlflow.pyfunc
from mlflow import MlflowClient
run_id = "e389609f9f1b44678ea7fea020453f94"
model_artifact_path = "pytorch model"
model = mlflow.pyfunc.load_model(model_uri=f"runs:/{run_id}/{model_artifact_path}")
print(model.metadata)
# OR
model_name = "pytorch_model"
model_version = 1
model = mlflow.pyfunc.load_model(model_uri=f"models:/{model_name}/{model_version}")
print(model.metadata)
# OR
client = MlflowClient()
client.set_registered_model_alias(name = model_name, alias = "staging", version = "1")
model_name = "pytorch_model"
model_alias = "staging"
model = mlflow.pyfunc.load_model(model_uri=f"models:/{model_name}@{model_alias}")
print(model.metadata)
An MLflow Project is a format for packaging data science code in a reusable and reproducible way, based primarily on conventions. In addition, the Projects component includes an API and command-line tools for running projects, making it possible to chain together projects into workflows.
Each project is simply a directory of files, or a Git repository, containing your code. Each project can specify several properties:
.py
or .sh
file in the project as an entry point.--env_manager=local
) and Docker containers.MLproject
fileYou can get more control over an MLflow Project by adding an MLproject
file, which is a text file in YAML syntax, to the project’s root directory. The following is an example of an MLproject
file:
# MLproject
name: Predicting customer satisfaction
python_env: python_env.yaml
# or
# conda_env: my_env.yaml
# or
# docker_env:
# image: mlflow-docker-example
entry_points:
main:
command: "python src/main.py"
# python_env.yaml
# Python version required to run the project.
python: "3.11"
# Dependencies required to build packages. This field is optional.
build_dependencies:
- pip
# Dependencies required to run the project.
dependencies:
- mlflow==2.7.3
- scikit-learn
- pandas
- zenml
- dvc
- giskard
-
# Add your dependencies here
For the project, you either:
--env-manager=local
to every mlflow run
command.pyenv
to let mlflow create a virtul environment as follows:pip install pyenv --upgrade
After that, add pyenv
binaries to PATH
env variable as follows:# Add the line
echo 'export PATH="$HOME/.pyenv/bin:$PATH"' >> ~/.bashrc
# load the file
source ~/.bashrc
# activate the virtual environment again
source .venv/bin/activate
Check that the file ~/.bashrc
contains the line:export PATH="$PATH:$HOME/.pyenv/bin"
When you do not want to run the MLflow project in a virtual environment, rather using the virtual environment of the local repository then you need to add the option --env-manager=local
MLproject file
, the command can be any string in Python format string syntax. MLflow allows specifying a data type and default value for each parameter.parameter_name: data_type # Without default
parameter_name: {type: data_type, default: value} # Short syntax
parameter_name: # Long syntax
type: data_type
default: value
You can run any MLflow project from a Git URI or from a local directory using the mlflow run
command-line tool (Method 1), or the mlflow.projects.run()
Python API (Method 2).
Both tools take the following parameters:
https://<repo>
(to use HTTPS) or user@host:path
(to use Git over SSH). To run against an MLproject file located in a subdirectory of the project, add a ‘#’ to the end of the URI argument, followed by the relative path from the project’s root directory to the subdirectory containing the desired project.main
.--env-manager=local
flag, but this can lead to unexpected results if there are dependency mismatches between the project environment and the current system environment.mlflow run $PROJECTPATH -P alpha=0.5 -e main --env-manager local
import mlflow
import os
project_path = os.path.expandvars("$PROJECTPATH")
project_uri = project_path
params = {"alpha": 0.5, "l1_ratio": 0.01}
entry_point = "main"
env_manager = "local"
# Run MLflow project and create a reproducible conda environment
# on a local host
mlflow.projects.run(project_uri,
parameters=params,
entry_point=entry_point,
env_manager=env_manager)
mlflow run git@github.com:mlflow/mlflow-example.git -P alpha=0.5 -e main --env-manager local
import mlflow
project_uri = "https://github.com/mlflow/mlflow-example"
params = {"alpha": 0.5, "l1_ratio": 0.01}
entry_point = "main"
env_manager = "local"
# Run MLflow project and create a reproducible conda environment
# on a local host
mlflow.projects.run(project_uri,
parameters=params,
entry_point=entry_point,
env_manager=env_manager)
By default, MLflow uses a new, temporary working directory for Git projects. This means that you should generally pass any file arguments to MLflow project using absolute, not relative, paths. If your project declares its parameters, MLflow automatically makes paths absolute for parameters of type path
.
When running an MLflow Project directory or repository that does not contain an MLproject
file, MLflow uses the project’s name as the name of the directory, and a Conda environment containing only latest version of Python.
Runtime parameters are passed to the entry point on the command line using --key value
syntax.
When we use Hydra for configuration management, it will be easy to run multiple experiments with single command line using --multirun
option.
python src/main.py --multirun
You can also activate the multirun mode of Hydra in config files rather than using --multirun
option as follows:
# Add this to `configs/main.yaml`
hydra:
mode: MULTIRUN
Hydra will store the config results for multi run experiments under multirun
folder.
By default, Hydra runs your multi-run jobs locally and serially. You can use the Joblib Launcher plugin which provides a launcher for parallel tasks based on Joblib.Parallel
. It can be installed as a separate package as follows:
pip install hydra-joblib-launcher --upgrade
Add the following to your configs/main.yaml
file:
# Override the default launcher
defaults:
- override hydra/launcher: joblib
# Set the number of parallel jobs
hydra:
launcher:
# override the number of jobs for joblib
n_jobs: 10 # it is 10 jobs in parallel now
After this change, Hydra will use joblib launcher whenever you use --multirun
as follows:
python src/app.py --multirun
An open source hyperparameter optimization framework to automate hyperparameter search. It can be installed as a separate package as follows:
pip install hydra-optuna-sweeper --upgrade
Add the following to your configs/main.yaml
file:
# Override the default launcher
defaults:
- override hydra/sweeper: optuna
# Set the number of parallel jobs
hydra:
sweeper:
sampler:
seed: 8888
direction: maximize # minimize
study_name: LR_optimization
n_trials: 20 # number of times to try to optimize the search space
n_jobs: 1 # parallelism
params:
x: range(-5.5, 5.5, step=0.5)
y: choice(-5 ,0 ,5)
Optuna concepts
Simple example:
"""
let's optimize a simple quadratic function: (x - 2)^2
"""
import optuna
# Objective function
def objective(trial):
x = trial.suggest_float("x", -10, 10)
return (x - 2) ** 2
study = optuna.create_study(study_name = "simple optimization example", direction="maximize")
study.optimize(objective, n_trials=100)
best_params = study.best_params
found_x = best_params["x"]
print("Found x: {}, (x - 2)^2: {}".format(found_x, (found_x - 2) ** 2))
study.best_params
study.best_value
study.best_trial
After this change, Hydra will use optuna sweeper whenever you use --multirun
as follows:
python src/app.py --multirun
Note:
The function in src/app.py
decorated with @hydra.main()
should return a float
which we want to minimize
/maximize
.
After you run the optimization, you might find the optimization_results.yaml
file (i.e. best params and best value) under multirun
logs folder.
Hydra provides an override parser that support rich syntax.
interval
is converted to UniformDistribution
. You can use IntUniformDistribution
, LogUniformDistribution
or IntLogUniformDistribution
by casting the interval to int
and tagging it with log
.python src/app.py --multirun 'x=int(interval(-5.0, 5.0))' 'y=tag(log, interval(1, 10))'
range
is converted to IntUniformDistribution
. If you apply shuffle
to range
, CategoricalDistribution
is used instead. If any of range’s start
, stop
or step
is of type float
, it will be converted to DiscreteUniformDistribution
.python src/app.py --multirun 'x=range(-5.0, 5.0)' 'y=shuffle(range(-5, 5))'
choice
is converted to CategoricalDistribution
.python src/app.py --multirun 'x=choice(-5.0, 0.0, 5.0)' 'y=choice(0, 1, 2, 3, 4, 5)'
Here I will show a demo on how we can use such tools in the project.
In the project, you need to use Hydra config files to store/retrieve the hyperparameters and settings of models and data. You also can use joblib
and optuna
plugins to speed up the hyperparameter optimization but be careful of race conditions on logging on the same run.
# The line below is the filename and should be stored in the root directory of the repository.
# MLproject
name: <Project Title>
# python_env: python_env.yaml
entry_points:
main:
command: "python src/main.py" # --multirun"
evaluate:
command: "python src/evaluate.py"
# configs/main.yaml
defaults:
- _self_
- data/sample
- data_version
- model/model
- experiment
# - override hydra/launcher: joblib # submitit_local #joblib
# - override hydra/sweeper: optuna
# - override hydra/sweeper/sampler: grid
hydra:
mode: MULTIRUN
# launcher:
# n_jobs: -1
sweeper:
params:
+model: "rf, lr"
# +model: "rf"
# configs/experiment.yaml
experiment_name: "mlops_experiment"
run_name: "multi_run"
test_size: 0.2
random_state: 88
cv_n_jobs: -1
train_data_version: "v4"
test_data_version: "v5"
# configs/data_version.yaml
data_version: v4
# configs/model/model.yaml
defaults:
- _self_
folds: 3
evaluation_metric: "f1"
cv_evaluation_metric: "mean_test_f1"
pyfunc_predict_fn: "predict_proba"
metrics:
accuracy: "accuracy"
f1: "f1"
# hydra:
# sweeper:
# sampler:
# seed: 8888
# # n_trials: 20 # number of times to try to optimize the search space
# direction: maximize # minimize
# configs/model/lr.yaml
model_name: logistic_regression
artifact_path: basic_lr
tag_key: "model"
tag_value: "basic LR"
module_name: "sklearn.linear_model"
class_name: "LogisticRegression"
params:
# penalty: ['l1', 'l2']
solver: ["saga", "lbfgs", "liblinear"]
max_iter: [100, 200, 1000]
random_state: [88] #, 100, 44]
C: [0.1, 0.5, 0.9]
# C: [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]
# C: np.arange(0.1, 1, 0.1)
# hydra:
# sweeper:
# params:
# +params.penalty: "'l1', 'l2'"
# +params.solver: "'saga'"
# +params.max_iter: "100, 200, 1000"
# +params.random_state: 88
# +params.C: range(start=0.1, stop=1, step=0.1)
# configs/model/rf.yaml
model_name: random_forest
artifact_path: basic_rf
tag_key: "model"
tag_value: "basic RF"
module_name: "sklearn.ensemble"
class_name: "RandomForestClassifier"
params:
n_estimators: [100, 200, 500]
criterion: ['gini', 'entropy', 'log_loss']
random_state: [88] #, 100, 44]
# hydra:
# sweeper:
# params:
# +params.n_estimators: "100, 200, 500"
# +params.criterion: "'gini', 'entropy', 'log_loss'"
# +params.random_state: 88
# src/model.py
import warnings
warnings.filterwarnings('ignore')
from sklearn.model_selection import GridSearchCV
from zenml.client import Client
import pandas as pd
import mlflow
import mlflow.sklearn
import importlib
def load_features(name, version, size = 1):
client = Client()
l = client.list_artifact_versions(name = name, tag = version, sort_by="version").items
l.reverse
df = l[0].load()
df = df.sample(frac = size, random_state = 88)
print("size of df is ", df.shape)
print("df columns: ", df.columns)
X = df[df.columns[:-1]]
y = df.y
print("shapes of X,y = ", X.shape, y.shape)
return X, y
def log_metadata(cfg, gs, X_train, y_train, X_test, y_test):
cv_results = pd.DataFrame(gs.cv_results_).filter(regex=r'std_|mean_|param_').sort_index(axis=1)
best_metrics_values = [result[1][gs.best_index_] for result in gs.cv_results_.items()]
best_metrics_keys = [metric for metric in gs.cv_results_]
best_metrics_dict = {k:v for k,v in zip(best_metrics_keys, best_metrics_values) if 'mean' in k or 'std' in k}
# print(cv_results, cv_results.columns)
params = best_metrics_dict
df_train = pd.concat([X_train, y_train], axis = 1)
df_test = pd.concat([X_test, y_test], axis = 1)
experiment_name = cfg.model.model_name + "_" + cfg.experiment_name
try:
# Create a new MLflow Experiment
experiment_id = mlflow.create_experiment(name=experiment_name)
except mlflow.exceptions.MlflowException as e:
experiment_id = mlflow.get_experiment_by_name(name=experiment_name).experiment_id # type: ignore
print("experiment-id : ", experiment_id)
cv_evaluation_metric = cfg.model.cv_evaluation_metric
run_name = "_".join([cfg.run_name, cfg.model.model_name, cfg.model.evaluation_metric, str(params[cv_evaluation_metric]).replace(".", "_")]) # type: ignore
print("run name: ", run_name)
if (mlflow.active_run()):
mlflow.end_run()
# Fake run
with mlflow.start_run():
pass
# Parent run
with mlflow.start_run(run_name = run_name, experiment_id = experiment_id) as run:
df_train_dataset = mlflow.data.pandas_dataset.from_pandas(df = df_train, targets = cfg.data.target_cols[0]) # type: ignore
df_test_dataset = mlflow.data.pandas_dataset.from_pandas(df = df_test, targets = cfg.data.target_cols[0]) # type: ignore
mlflow.log_input(df_train_dataset, "training")
mlflow.log_input(df_test_dataset, "testing")
# Log the hyperparameters
mlflow.log_params(gs.best_params_)
# Log the performance metrics
mlflow.log_metrics(best_metrics_dict)
# Set a tag that we can use to remind ourselves what this run was for
mlflow.set_tag(cfg.model.tag_key, cfg.model.tag_value)
# Infer the model signature
signature = mlflow.models.infer_signature(X_train, gs.predict(X_train))
# Log the model
model_info = mlflow.sklearn.log_model(
sk_model = gs.best_estimator_,
artifact_path = cfg.model.artifact_path,
signature = signature,
input_example = X_train.iloc[0].to_numpy(),
registered_model_name = cfg.model.model_name,
pyfunc_predict_fn = cfg.model.pyfunc_predict_fn
)
client = mlflow.client.MlflowClient()
client.set_model_version_tag(name = cfg.model.model_name, version=model_info.registered_model_version, key="source", value="best_Grid_search_model")
for index, result in cv_results.iterrows():
child_run_name = "_".join(['child', run_name, str(index)]) # type: ignore
with mlflow.start_run(run_name = child_run_name, experiment_id= experiment_id, nested=True): #, tags=best_metrics_dict):
ps = result.filter(regex='param_').to_dict()
ms = result.filter(regex='mean_').to_dict()
stds = result.filter(regex='std_').to_dict()
# Remove param_ from the beginning of the keys
ps = {k.replace("param_",""):v for (k,v) in ps.items()}
mlflow.log_params(ps)
mlflow.log_metrics(ms)
mlflow.log_metrics(stds)
# We will create the estimator at runtime
module_name = cfg.model.module_name # e.g. "sklearn.linear_model"
class_name = cfg.model.class_name # e.g. "LogisticRegression"
# Load "module.submodule.MyClass"
class_instance = getattr(importlib.import_module(module_name), class_name)
estimator = class_instance(**ps)
estimator.fit(X_train, y_train)
# from sklearn.model_selection import cross_val_score
# scores = cross_val_score(estimator=estimator,
# X_train,
# y_train,
# cv = cfg.model.folds,
# n_jobs=cfg.cv_n_jobs,
# scoring=cfg.model.cv_evaluation_metric)
# cv_evaluation_metric = scores.mean()
signature = mlflow.models.infer_signature(X_train, estimator.predict(X_train))
model_info = mlflow.sklearn.log_model(
sk_model = estimator,
artifact_path = cfg.model.artifact_path,
signature = signature,
input_example = X_train.iloc[0].to_numpy(),
registered_model_name = cfg.model.model_name,
pyfunc_predict_fn = cfg.model.pyfunc_predict_fn
)
model_uri = model_info.model_uri
loaded_model = mlflow.sklearn.load_model(model_uri=model_uri)
predictions = loaded_model.predict(X_test) # type: ignore
eval_data = pd.DataFrame(y_test)
eval_data.columns = ["label"]
eval_data["predictions"] = predictions
results = mlflow.evaluate(
data=eval_data,
model_type="classifier",
targets="label",
predictions="predictions",
evaluators=["default"]
)
print(f"metrics:\n{results.metrics}")
# mlflow.end_run()
# mlflow.end_run()
def train(X_train, y_train, cfg):
# Define the model hyperparameters
params = cfg.model.params
# Train the model
module_name = cfg.model.module_name # e.g. "sklearn.linear_model"
class_name = cfg.model.class_name # e.g. "LogisticRegression"
# We will create the estimator at runtime
import importlib
# Load "module.submodule.MyClass"
class_instance = getattr(importlib.import_module(module_name), class_name)
estimator = class_instance(**params)
# Grid search with cross validation
from sklearn.model_selection import StratifiedKFold
cv = StratifiedKFold(n_splits=cfg.model.folds, random_state=cfg.random_state, shuffle=True)
param_grid = dict(params)
scoring = list(cfg.model.metrics.values()) # ['balanced_accuracy', 'f1_weighted', 'precision', 'recall', 'roc_auc']
evaluation_metric = cfg.model.evaluation_metric
gs = GridSearchCV(
estimator = estimator,
param_grid = param_grid,
scoring = scoring,
n_jobs = cfg.cv_n_jobs,
refit = evaluation_metric,
cv = cv,
verbose = 1,
return_train_score = True
)
gs.fit(X_train, y_train)
return gs
def retrieve_model_with_alias(model_name, model_alias = "champion") -> mlflow.pyfunc.PyFuncModel:
best_model:mlflow.pyfunc.PyFuncModel = mlflow.pyfunc.load_model(model_uri=f"models:/{model_name}@{model_alias}")
# best_model
return best_model
def retrieve_model_with_version(model_name, model_version = "v1") -> mlflow.pyfunc.PyFuncModel:
best_model:mlflow.pyfunc.PyFuncModel = mlflow.pyfunc.load_model(model_uri=f"models:/{model_name}/{model_version}")
# best_model
return best_model
# src/main.py
import hydra
from model import train, load_features, log_metadata
from omegaconf import OmegaConf
def run(args):
cfg = args
train_data_version = cfg.train_data_version
X_train, y_train = load_features(name = "features_target", version=train_data_version)
test_data_version = cfg.test_data_version
X_test, y_test = load_features(name = "features_target", version=test_data_version)
# print(X_train.shape, X_test.shape, y_train.shape, y_test.shape)
gs = train(X_train, y_train, cfg=cfg)
log_metadata(cfg, gs, X_train, y_train, X_test, y_test)
@hydra.main(config_path="../configs", config_name="main", version_base=None) # type: ignore
def main(cfg=None):
# print(OmegaConf.to_yaml(cfg))
run(cfg)
if __name__=="__main__":
main()
mlflow run
command.mlflow run . --env-manager=local
Here I am using the current virtual environment (--env-manager=local
) for running the MLflow project and it is enough for now.
Master’s students who will probably work with Pytorch, can use skorch
package to wrap deep networks and get benefit from GridSearch functionality in sklearn. Check the links below for more info.
Note: The project tasks are graded, and they form the practice part of the course. We have tasks for repository and as well as for report (for Master’s student).
Here use only one (e.g. first) version of your data samples for training your model.
MLproject
file with one entry point main
to run src/main.py
with --multirun
option. You should decorate its main
function with @hydra.main
. We are using multirun to run multiple experiments by overriding parameters at runtime. The overriden parameters will be added to the configs/main.yml
file as explained in the demo above. You can also add overriden parameteres in MLproject
file when you run the script but I prefer the former approach.main
in src/main.py
will extract the features from ZenML artifact store, train a model, evaluate it via cross validtion and log the metadata. Keep your code clean and organize your project in different modules such that it is easy to maintain. For instance, you can create a function extract_data
for the first task, another function train
for the second task, evaluate
for the third task, log_metadata
for the fourth task. Then you call them inside main
function. Try to put the functions which deal with data in src/data.py
module and functions which deal with models in src/model.py
.v1
is used for training and validation, and v2
is used for testing the models.mlflow.evaluate
to log the metrics. Log all hyperparameters you optimized.acc_0.78322932
. Use the same model name for registering all models and let the version increment.k=3
folds.nested=True
).results
in your repository. You can download artifacts (images, models, …etc) from MLflow store as follows:mlflow.artifacts.download_artifacts(artifact_uri, dst_path)
# OR
mlflow.artifacts.download_artifacts(run_id, artifact_path, dst_path)
# artifact_uri is usually starts with runs://run_id/path/to/artifact
results
folder as a text file.staging
. You can select more than one model per architecture. Assign an alias champion
for the best model among all of these models and another alias challenger1
, challenger2
, challenger3
, …etc to the other models (The alias cannot be used more than once per registered model). It is enough to have one model as champion
and one model as challenger
.models
folder.evaluate
to MLproject
file to evaluate the selected model given the data sample version and model alias. By default, we assume that we evaluate the first sample on the champion
model. These arguments can be passed to the function evaluate
in src/evaluate.py
using config files or directly in MLproject
file.In production, the model that is currently deployed is called the Champion model, whereas the model(s) that are being tested in production is/are called the Challenger models. Here we do not have models in production but we aim to deploy the champion model. For the next iterations of the project, we use shadow testing to test the champion model (already in production) with challenger models (candidates to replace the champion model) on the same input data.
You should use Hydra for configuration management and running multiple experiments.
Complete the following chapters: